package com.jie.flink.cdc.mycdc;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;

/**
 * @author alanchan

DROP TABLE IF EXISTS emp;
CREATE TABLE emp (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(255) ,
`age` INTEGER ,
`gender` INTEGER,
`address` VARCHAR(512)
);

 */
public class TestJdbcDemoThree {

	/**
	 * Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink
	 * SHOW VARIABLES LIKE '%time_zone%'
	 *
	 * SET GLOBAL time_zone = '+8:00';		-- 以后每次连接mysql都是东八区
	 * SET time_zone = '+8:00';			-- 只有此次连接mysql是东八区，下次连接mysql又回到系统默认时区
	 *
	 * @param args
	 * @throws DatabaseNotExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		//只有开启checkpoint 才能实时同步数据的变化。否则只能是一次性全量同步
		env.enableCheckpointing(3000);

		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String createTableSql = "CREATE TABLE b (\n" +
				"  id INT NOT NULL ,\n" +
				"  name string,\n" +
				"  age int,\n" +
				"  gender int,\n" +
				"  address string\n" +
				") WITH (\n" +
				"   'connector' = 'mysql-cdc',\n" +
				"   'hostname' = 'localhost',\n" +
				"   'port' = '3306',\n" +
				"   'username' = 'root',\n" +
				"   'password' = 'root',\n" +
				"   'scan.incremental.snapshot.chunk.key-column' = 'id',\n" +
				"   'database-name' = 'test',\n" +
				"   'table-name' = 't_user'\n" +
				");";

		tenv.executeSql(createTableSql);

		String createTableSqlaa = "CREATE TABLE aa (\n" +
				"  id INT NOT NULL ,\n" +
				"  name string,\n" +
				"  age int,\n" +
				"  gender int,\n" +
				"  address string\n" +
//				"  PRIMARY KEY (id) NOT ENFORCED" +
				") WITH (\n" +
				"   'connector' = 'mysql-cdc',\n" +
				"   'hostname' = 'localhost',\n" +
				"   'port' = '3306',\n" +
				"   'username' = 'root',\n" +
				"   'password' = 'root',\n" +
				"   'scan.incremental.snapshot.chunk.key-column' = 'id',\n" +
				"   'database-name' = 'test_copy',\n" +
				"   'table-name' = 't_user'\n" +
				")";

		tenv.executeSql(createTableSqlaa);

		String sql = " insert into aa SELECT * FROM b";
		tenv.executeSql(sql);
	}

}
